Skip to content

Conversation

@vgvoleg
Copy link
Collaborator

@vgvoleg vgvoleg commented Sep 24, 2024

Pull request type

Please check the type of change your PR introduces:

  • Bugfix
  • Feature
  • Code style update (formatting, renaming)
  • Refactoring (no functional changes, no api changes)
  • Build related changes
  • Documentation content changes
  • Other (please describe):

What is the current behavior?

Issue Number: N/A

What is the new behavior?

Other information

@vgvoleg vgvoleg force-pushed the topic_batch_messages branch from 7aa3545 to 2e6ad26 Compare September 24, 2024 13:03
@github-actions
Copy link

github-actions bot commented Sep 24, 2024

🌋 Here are results of SLO test for Python SDK over Table Service:

Grafana Dashboard

SLO-sync-python-table

@github-actions
Copy link

github-actions bot commented Sep 24, 2024

🌋 Here are results of SLO test for Python SDK over Query Service:

Grafana Dashboard

SLO-sync-python-query

@vgvoleg vgvoleg force-pushed the topic_batch_messages branch from 24f684c to 60a4504 Compare September 24, 2024 14:53
@vgvoleg vgvoleg requested a review from rekby September 25, 2024 12:12
_state_changed: asyncio.Event
_closed: bool
_message_batches: typing.Deque[datatypes.PublicBatch]
_message_batches: typing.Dict[int, datatypes.PublicBatch]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_message_batches: typing.Dict[int, datatypes.PublicBatch]
_message_batches: typing.Dict[int, datatypes.PublicBatch] # keys are partition session ID

self._state_changed.clear()

def _get_first_batch(self) -> typing.Tuple[int, datatypes.PublicBatch]:
first_id, batch = self._message_batches.popitem(last=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first_id -> partition_session_id - for understand mean of the number

def _add_batch_to_queue(self, batch: datatypes.PublicBatch):
part_sess_id = batch._partition_session.id
if part_sess_id in self._message_batches:
self._message_batches[part_sess_id].messages.extend(batch.messages)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about implement _push/_pop method for PublicBatch? it will be easer refctor internals in the future.

return len(stream_reader._message_batches[partition_session_id].messages)

initial_batches = batch_count()
initial_batch_size = batch_size() if not new_batch else 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about invert condition?

Suggested change
initial_batch_size = batch_size() if not new_batch else 0
initial_batch_size = 0 if new_batch else batch_size()

@vgvoleg vgvoleg merged commit 4877cc8 into main Sep 27, 2024
11 checks passed
@vgvoleg vgvoleg deleted the topic_batch_messages branch September 27, 2024 10:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants